package com.chenjl.trace.stream;

import com.chenjl.trace.stream.topology.SentencesRichSpout;
import com.chenjl.trace.stream.topology.WordCountRichBolt;
import com.chenjl.trace.stream.topology.WordSplitRichBolt;
import com.chenjl.trace.stream.util.Constant;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import lombok.extern.slf4j.Slf4j;
/**
 * 全链路监控流计算入口
 * 2019-1-10 15:45:33
 * @author chenjinlong
 */
@Slf4j
public class TraceStreamApp {

	public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
		log.info("streamApp begin~~~");
		
		TopologyBuilder topologyBuilder = new TopologyBuilder();
		topologyBuilder.setSpout(Constant.SENTENCESRICHSPOUT_ID, new SentencesRichSpout(),Constant.spoutParal);
		topologyBuilder.setBolt(Constant.WORDSPLITRICHBOLT_ID, new WordSplitRichBolt(),Constant.boltParal).localOrShuffleGrouping(Constant.SENTENCESRICHSPOUT_ID);
		topologyBuilder.setBolt(Constant.WORDCOUNTRICHBOLT_ID, new WordCountRichBolt(),Constant.boltParal).localOrShuffleGrouping(Constant.WORDSPLITRICHBOLT_ID);
		
		 Config config = new Config();
		 config.setDebug(true);
		 config.setNumWorkers(Constant.WORKER_NUM);
		
		StormSubmitter.submitTopology(Constant.TOPOLOGY_NAME,config,topologyBuilder.createTopology());
		
        log.info("streamApp end~~~");
	}
}